package io.renren.config;

import io.renren.bo.DelayedTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 延时队列任务管理
 *
 * @version 创建时间：2018年6月16日 下午3:35:39
 */
@Component
public class TaskQueueBean {

    private static final Logger LOG = LoggerFactory.getLogger(TaskQueueBean.class);

    private static volatile boolean started = false;

    private TaskQueueBean() {
    }

    private static class LazyHolder {
        private static TaskQueueBean taskQueueDaemonThread = new TaskQueueBean();
    }

    public static TaskQueueBean getInstance() {
        return LazyHolder.taskQueueDaemonThread;
    }

    /**
     * 执行任务的线程
     */
    private ExecutorService executor = null;

    /**
     * 创建一个最初为空的新 DelayQueue
     */
    private DelayQueue<DelayedTask<Runnable>> queue = null;

    /**
     * 守护线程
     */
    private Thread daemonThread;

    /**
     * 初始化守护线程
     */
    @PostConstruct
    public synchronized void start() {
        // 1.初始化线程池
        if (!started) {
            started = true;
            executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
            queue = new DelayQueue<>();
            // 2.判断是否启动
            if (daemonThread != null && daemonThread.isInterrupted()) {
                daemonThread.start();
                return;
            }
        }
        daemonThread = new Thread() {
            @Override
            public void run() {
                try {
                    execute();
                } catch (InterruptedException e) {
                    daemonThread.interrupt();
                }
            }
        };
        daemonThread.setDaemon(true);
        daemonThread.setName("DelayedTask");
        daemonThread.start();
        LOG.info("延时任务开启");
    }

    private void execute() throws InterruptedException {
        LOG.info("[ task start {} ]:", System.currentTimeMillis());
        while (started) {
            // 从延迟队列中取值,如果没有对象过期则队列一直等待，
            DelayedTask<Runnable> t1 = queue.take();
            if (t1 != null) {
                // 修改问题的状态
                Runnable task = t1.getTask();
                if (task == null) {
                    continue;
                }
                executor.execute(task);
                LOG.info("[ {}  task {} execute  ] ", t1.getLockKey(), t1.getN());
            }

        }

    }

    /**
     * 添加任务
     *
     * @param time    时间
     * @param task    任务
     * @param lockKey 任务唯一性标志
     * @return
     */
    public DelayedTask put(long time, Runnable task, String lockKey) {
        if (!started) {
            throw new UnsupportedOperationException("请先启动taskQueneBean！");
        }
        // 转换成ns
        long nanoTime = TimeUnit.NANOSECONDS.convert(time, TimeUnit.MILLISECONDS);
        // 创建一个任务
        DelayedTask<Runnable> k = new DelayedTask<Runnable>(nanoTime, task, lockKey);
        // 将任务放在延迟的队列中
        queue.put(k);
        LOG.info("[ {} ] 加入队列，当前队列任务数量：{}", lockKey, queue.size());
        return k;
    }

    /**
     * 结束
     *
     * @param task
     */
    public boolean endTask(DelayedTask<Runnable> task) {
        if (!started) {
            throw new UnsupportedOperationException("请先启动taskQueneBean！");
        }
        return queue.remove(task);
    }

    /**
     * 手动关闭任务
     */
    public synchronized void stop() {
        if (started) {
            LOG.info("shutdown TaskQueueBean");
            started = false;
            daemonThread.interrupt();
            executor.shutdown();
            daemonThread = null;
            queue = null;
        }
    }
}
